package com.bruce.tool.mq.aliyun.core;

import com.aliyun.openservices.ons.api.Consumer;
import com.bruce.tool.common.exception.BaseRuntimeException;
import com.bruce.tool.mq.aliyun.config.AliMqConfig;
import com.bruce.tool.mq.aliyun.constant.AliMqCode;
import com.bruce.tool.mq.aliyun.constant.AliMqPool;
import com.bruce.tool.mq.aliyun.domain.AliResponse;
import com.bruce.tool.mq.aliyun.handler.AliMqHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Objects;

/**
 * 功能 :
 * 接收者
 * @author : Bruce(刘正航) 11:02 2019-02-14
 */
@Component
public class AliReceiver {

    @Autowired
    private AliMqPool pool;

    /**直接给consumer让用户自己处理subscribe**/
    public void origin(String code, AliMqHandler handler){
        this.execute(code,handler,true);
    }

    /**返回消息体给用户回调函数**/
    public void execute(String code, AliMqHandler handler){
        this.execute(code,handler,false);
    }

    private void execute(String code, AliMqHandler handler, boolean isBase){
        AliMqConfig config = pool.getConfigs().get(code);
        if(Objects.isNull(config) ){
            throw new BaseRuntimeException(AliMqCode.MQERROR_CONFIG_NULL.getCode(), AliMqCode.MQERROR_CONFIG_NULL.getMessage());
        }

        Consumer consumer = pool.getReceivers().get(code);
        if(Objects.isNull(consumer) ){
            throw new BaseRuntimeException(AliMqCode.MQERROR_RECEIVE_NULL.getCode(), AliMqCode.MQERROR_RECEIVE_NULL.getMessage());
        }

        // 启动之前先取消原有topic的监听
        consumer.unsubscribe(config.getTopic());
        // 启动之前先停止服务
        consumer.shutdown();

        if(isBase){
            // 自定义消息接受者的,topic和tags
            handler.handle(consumer);
            consumer.start();
            return;
        }

        consumer.subscribe(config.getTopic(), config.getTags(), (message, context) -> handler.handle(
                AliResponse.builder().topic(message.getTopic()).messageId(message.getMsgID()).body(message.getBody()).build()
        ));
        consumer.start();
    }
}
